package org.codehaus.activemq.store.bdb;

import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.SecondaryKeyCreator;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import java.io.File;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.DefaultWireFormat;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class BDbPersistenceAdapter extends PersistenceAdapterSupport
{
  private static final Log log = LogFactory.getLog(BDbPersistenceAdapter.class);
  private Environment environment;
  private WireFormat wireFormat;
  private DatabaseConfig config;
  private TransactionConfig transactionConfig;
  private File directory = new File("ActiveMQ");

  public static BDbPersistenceAdapter newInstance(File directory)
    throws JMSException
  {
    return new BDbPersistenceAdapter(directory);
  }

  public BDbPersistenceAdapter()
  {
    this(null, new DefaultWireFormat());
  }

  public BDbPersistenceAdapter(File directory) {
    this();
    this.directory = directory;
  }

  public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat) {
    this(environment, wireFormat, BDbHelper.createDatabaseConfig(), new TransactionConfig());
  }

  public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat, DatabaseConfig config, TransactionConfig transactionConfig) {
    this.environment = environment;
    this.wireFormat = wireFormat;
    this.config = config;
    this.transactionConfig = transactionConfig;
  }

  public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
    try {
      Database database = createDatabase("Queue_" + destinationName);
      SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
      SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
      SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Queue_Index_" + destinationName, database, secondaryConfig);
      sequenceNumberCreator.initialise(secondaryDatabase);
      return new BDbMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, this.wireFormat.copy());
    } catch (DatabaseException e) {
    	   throw JMSExceptionHelper.newJMSException("Could not create Queue MessageContainer for destination: " + destinationName + ". Reason: " + e, e);
    	   
    }
  }

  public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException
  {
    try
    {
      Database database = createDatabase("Topic_" + destinationName);
      SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
      SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
      SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Topic_Index_" + destinationName, database, secondaryConfig);
      sequenceNumberCreator.initialise(secondaryDatabase);
      Database subscriptionDatabase = createDatabase("ConsumeAck_" + destinationName);
      return new BDbTopicMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, this.wireFormat.copy(), subscriptionDatabase);
    } catch (DatabaseException e) {
        throw JMSExceptionHelper.newJMSException("Could not create Topic MessageContainer for destination: " + destinationName + ". Reason: " + e, e);
        
    }
 }

  public PreparedTransactionStore createPreparedTransactionStore() throws JMSException
  {
    try
    {
      return new BDbPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
    } catch (DatabaseException e) {
        throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);

    }
  }

  public void beginTransaction() throws JMSException
  {
    try
    {
      if (BDbHelper.getTransactionCount() == 0) {
        Transaction transaction = this.environment.beginTransaction(BDbHelper.getTransaction(), this.transactionConfig);
        BDbHelper.pushTransaction(transaction);
      }
      else {
        Transaction transaction = BDbHelper.getTransaction();
        BDbHelper.pushTransaction(transaction);
      }
    }
    catch (DatabaseException e) {
      throw JMSExceptionHelper.newJMSException("Failed to begin transaction: " + e, e);
    }
  }

  public void commitTransaction() throws JMSException
  {
    if (BDbHelper.getTransactionCount() == 1) {
      Transaction transaction = BDbHelper.getTransaction();
      if (transaction == null)
        log.warn("Attempt to commit transaction when non in progress");
      else
        try
        {
          transaction.commit();
        }
        catch (DatabaseException e) {
          throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + transaction + ": " + e, e);
        }
        finally {
          BDbHelper.popTransaction();
        }
    }
    else
    {
      BDbHelper.popTransaction();
    }
  }

  public void rollbackTransaction() {
    Transaction transaction = BDbHelper.getTransaction();
    if (transaction != null)
      if (BDbHelper.getTransactionCount() == 1) {
        try {
          transaction.abort();
        }
        catch (DatabaseException e) {
          log.warn("Cannot rollback transaction due to: " + e, e);
        }
        finally {
          BDbHelper.popTransaction();
        }
      }
      else
        BDbHelper.popTransaction();
  }

  public void start()
    throws JMSException
  {
    if (this.environment == null) {
      this.directory.mkdirs();

      log.info("Creating Berkeley DB based message store in directory: " + this.directory.getAbsolutePath());
      try
      {
        this.environment = BDbHelper.createEnvironment(this.directory);
      }
      catch (DatabaseException e) {
        throw JMSExceptionHelper.newJMSException("Failed to open Berkeley DB persistent store at directory: " + this.directory + ". Reason: " + e, e);
      }
    }
  }

  public synchronized void stop() throws JMSException
  {
    if (this.environment != null)
      try {
        this.environment.close();
      }
      catch (DatabaseException e) {
        throw JMSExceptionHelper.newJMSException("Failed to close environment. Reason: " + e, e);
      }
      finally {
        this.environment = null;
      }
  }

  public File getDirectory()
  {
    return this.directory;
  }

  public void setDirectory(File directory) {
    this.directory = directory;
  }

  public WireFormat getWireFormat() {
    return this.wireFormat;
  }

  public void setWireFormat(WireFormat wireFormat) {
    this.wireFormat = wireFormat;
  }

  public TransactionConfig getTransactionConfig() {
    return this.transactionConfig;
  }

  public void setTransactionConfig(TransactionConfig transactionConfig) {
    this.transactionConfig = transactionConfig;
  }

  public Environment getEnvironment() {
    return this.environment;
  }

  public void setEnvironment(Environment environment) {
    this.environment = environment;
  }

  public DatabaseConfig getConfig() {
    return this.config;
  }

  public void setConfig(DatabaseConfig config) {
    this.config = config;
  }

  protected Database createDatabase(String name)
    throws DatabaseException
  {
    if (log.isTraceEnabled()) {
      log.trace("Opening database: " + name);
    }
    return this.environment.openDatabase(null, name, this.config);
  }

  protected SecondaryDatabase createSecondaryDatabase(String name, Database database, SecondaryConfig secondaryConfig)
    throws DatabaseException
  {
    if (log.isTraceEnabled()) {
      log.trace("Opening secondary database: " + name);
    }
    return this.environment.openSecondaryDatabase(null, name, database, secondaryConfig);
  }

  public static JMSException closeDatabase(Database db, JMSException firstException) {
    if (db != null)
    {
      if (log.isTraceEnabled()) {
        try {
          log.trace("Closing database: " + db.getDatabaseName());
        }
        catch (DatabaseException e) {
          log.trace("Closing database: " + db + " but could not get the name: " + e);
        }
      }
      try
      {
        db.close();
      }
      catch (DatabaseException e) {
        if (firstException == null) {
          firstException = JMSExceptionHelper.newJMSException("Failed to close database. Reason: " + e, e);
        }
      }
    }
    return firstException;
  }

  protected SecondaryConfig createSecondaryConfig(SecondaryKeyCreator keyGenerator) {
    SecondaryConfig answer = new SecondaryConfig();
    answer.setKeyCreator(keyGenerator);
    answer.setAllowCreate(true);
    answer.setAllowPopulate(true);
    answer.setTransactional(true);
    return answer;
  }
}